JUC JUC 全称是 java.util.concurrent,作用是在并发编程中使用的工具类
进程的状态 NEW,(新建) RUNNABLE,(准备就绪) BLOCKED,(阻塞) WAITING,(不见不散) TIMED_WAITING,(过时不候) TERMINATED, (终结)
wait/sleep 的区别 功能都是当前线程暂停,区别在于 wait 是放开手去睡,放开手里的锁。sleep 是握紧手去睡,醒了手里还有锁。
并发/并行 并发:同一时刻多个线程在访问同一个资源,多个线程对一个点。例子,小米9今天上午10点,限量抢购,春运抢票,电商秒杀… 并行:多项工作一起执行,之后再汇总。例子,泡方便面,电水壶烧水,一边撕调料倒入桶中
Lock Lock 和 Synchronized 的区别:Lock 实现提供了比使用 Synchronized 方法和语句可以获得的更广泛的锁操作。它们允许更灵活的结构,可能具有非常不同的属性,并且可能支持多个关联的条件对象。而且 Lock 是手动上锁和解锁,Synchronized 则控制不了。
ReentrantLock 可重入锁,Lock 接口的实现,代码地址
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 package com.atguigu.juc;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;class Ticket { private int number = 30 ; private Lock lock = new ReentrantLock(); public void sale () { lock.lock(); try { if (number > 0 ) { System.out.println(Thread.currentThread().getName() + "卖出:" + number-- + "号票,还剩:" + number + "张" ); } } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } } public class SaleTicket { public static void main (String[] args) { Ticket ticket = new Ticket(); new Thread(()-> {for (int i = 0 ; i < 40 ; i++) ticket.sale();}, "AA" ).start(); new Thread(()-> {for (int i = 0 ; i < 40 ; i++) ticket.sale();}, "BB" ).start(); new Thread(()-> {for (int i = 0 ; i < 40 ; i++) ticket.sale();}, "CC" ).start(); } }
java8特性 函数式接口 在 java.util.function 包下面的都是函数式接口。函数式接口都有 @FunctionalInterface 注解。
内置四大函数式接口 Consumer<T>
: 消费型接口,有参无返回值。范型 T 是参数类型Supplier<T>
: 供给型接口,无参有返回值。范型 T 是返回值类型Function<T, R>
: 函数型接口,有参有返回值。范型 T 是参数类型,R 是返回值类型Predicate<T>
: 断定型接口,有参,返回值为 bool。范型 T 是参数类型
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 package com.atguigu.juc;import java.util.function.Consumer;import java.util.function.Function;import java.util.function.Predicate;import java.util.function.Supplier;public class FunctionalInterfaceDemo { public static void main (String[] args) { Consumer<String> consumer = t-> { System.out.println(t); }; consumer.accept("ABC" ); Supplier<String> supplier = () -> { return "DEF" ; }; System.out.println(supplier.get()); Function<String, Integer> function = (t) ->{ return t.length(); }; System.out.println(function.apply("zhangsan" )); Predicate<String> predicate = (t) -> { return t.length() > 10 ?true :false ; }; System.out.println(predicate.test("zhangsan" )); } }
Stream 流 Stream 流是数据渠道,用于操作数据源(集合,数组等)。“集合注重的是数据,流注重的是计算“。
特点:
Stream 自己不会存储元素
Stream 不会改变源对象,相反,会返回一个新的持有结果的 Stream 流
Stream 的操作是延迟的,这意味着他们会等到需要结果的时候才执行
例子:
按照给出的数据,找出偶数ID,年龄大于 24,用户名转为大写,用户名字幕倒序,只输出一个用于名字。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 package com.atguigu.juc;import java.util.Arrays;import java.util.List;import java.util.stream.Stream;class User { private int id; private String userName; private int age; public int getId () { return id; } public void setId (int id) { this .id = id; } public String getUserName () { return userName; } public void setUserName (String userName) { this .userName = userName; } public int getAge () { return age; } public void setAge (int age) { this .age = age; } public User (int id, String userName, int age) { this .id = id; this .userName = userName; this .age = age; } @Override public String toString () { return "User{" + "id=" + id + ", userName='" + userName + '\'' + ", age=" + age + '}' ; } } public class StreamDemo { public static void main (String[] args) { User u1 = new User(11 , "a" , 23 ); User u2 = new User(12 , "b" , 24 ); User u3 = new User(13 , "c" , 22 ); User u4 = new User(14 , "d" , 28 ); User u5 = new User(16 , "e" , 26 ); List<User> list = Arrays.asList(u1, u2, u3, u4, u5); list.stream().filter((t) -> { return t.getId() % 2 == 0 ; }).filter((t) -> { return t.getAge() > 24 ; }).map((t) -> { return t.getUserName().toUpperCase(); }).sorted((o1, o2) -> { return o2.compareTo(o1); }).limit(1 ).forEach(System.out::println); } }
lambda 表达式 Lambda 是一个匿名函数,我们可以把 Lambda 表达式理解为是一段可以传递的代码(将代码像数据一样进行传递)。可以写出更简洁、更灵活的代码。作为一种更紧凑的代码风格,使Java 的语言表达能力得到了提升。
Lambda 表达式在 Java 语言中引入了一个新的语法元素和操作符。这个操作符为 “->” , 该操作符被称为 Lambda 操作符或剪头操作符。它将 Lambda 分为两个部分,左侧:指定了 Lambda 表达式需要的所有参数,右侧:指定了 Lambda 体,即 Lambda 表达式要执行的功能。
使用 Lambda 表达式的条件是 lambda 表达式,必须是函数式接口,即必须只有一个方法,如果接口只有一个方法 java 默认它为函数式接口。为了正确使用Lambda表达式,需要给接口加个注解:@FunctionalInterface,如有两个方法,立刻报错。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 package com.atguigu.juc;public class LambdaDemo { @FunctionalInterface interface Foo { public int add (int x, int y) ; } public static void main (String[] args) { Foo foo = new Foo() { @Override public int add (int x, int y) { return x + y; } }; System.out.println(foo.add(10 , 5 )); Foo foo1 = (int x, int y) -> { return x + y; }; System.out.println(foo1.add(10 , 5 )); } }
接口里是否能有实现方法 代码地址
在 Java 8 中接口新增了 default 方法和静态方法。即,一个接口中可以有多个 default 方法,也可以有多个静态当法。如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 package com.atguigu.juc;public class LambdaDemo { @FunctionalInterface interface Foo { public int add (int x, int y) ; default int sub (int x, int y) { return x - y; } default int div (int x, int y) { return x / y; } public static int aa (int x, int y) { return x + y + 100 ; } public static int bb (int x, int y) { return x - y - 100 ; } } public static void main (String[] args) { Foo foo = new Foo() { @Override public int add (int x, int y) { return x + y; } }; System.out.println(foo.add(10 , 5 )); Foo foo1 = (int x, int y) -> { return x + y; }; System.out.println(foo1.add(10 , 5 )); System.out.println(foo1.div(10 , 5 )); System.out.println(foo1.sub(10 , 5 )); } }
线程间通信 需求: 四个线程 ABCD。两个线程进行 + 1。另外个线程进行 -1。要求 +1 和 -1 交替进行。每个线程执行 10 次
synchronized 实现 代码地址
注意⚠️:在使用 object 中的 wait 方法的时候,一定要为 wait 使用 while 循环。if 判断只会判断一次,如果使用 if 当线程再次被唤醒的时候,因为执行过判断了,就有可能绕过条件(以前判断过了,认为没问题,但是其实已经不满足条件了)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 package com.atguigu.juc;public class NotifyWaitDemo { private int number = 0 ; public synchronized void increment () throws InterruptedException { while (number !=0 ) { this .wait(); } number++; System.out.println(Thread.currentThread().getName()+"\t" +number); this .notifyAll(); } public synchronized void decrement () throws InterruptedException { while (number == 0 ) { this .wait(); } number--; System.out.println(Thread.currentThread().getName() + "\t" + number); this .notifyAll(); } public static void main (String[] args) { NotifyWaitDemo notifyWaitDemo = new NotifyWaitDemo(); new Thread(new Runnable() { @Override public void run () { for (int i = 0 ; i < 10 ; i++) { try { notifyWaitDemo.increment(); } catch (InterruptedException e) { e.printStackTrace(); } } } }, "线程A" ).start(); new Thread(new Runnable() { @Override public void run () { for (int i = 0 ; i < 10 ; i++) { try { notifyWaitDemo.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } } } }, "线程B" ).start(); new Thread(new Runnable() { @Override public void run () { for (int i = 0 ; i < 10 ; i++) { try { notifyWaitDemo.increment(); } catch (InterruptedException e) { e.printStackTrace(); } } } }, "线程C" ).start(); new Thread(new Runnable() { @Override public void run () { for (int i = 0 ; i < 10 ; i++) { try { notifyWaitDemo.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } } } }, "线程D" ).start(); } }
1 2 3 4 5 6 7 线程B 0 线程A 1 线程B 0 线程A 1 线程B 0 ...... 线程D 0
上述代码中,如果不使用 while ,那么就会出现值大于1或者小于 0 的情况。
lock 实现 代码地址
与 synchronized 相比,lock 中实现释放当前线程资源的方法是 Condition 中的 await 方法。唤醒其他线程的方法是 Condition 中的 signal 方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 package com.atguigu.juc;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;public class LockSingalAwait { private int number = 0 ; private Lock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); public void increment () throws InterruptedException { lock.lock(); try { while (number !=0 ) { condition.await(); } number++; System.out.println(Thread.currentThread().getName()+"\t" +number); condition.signalAll(); } finally { lock.unlock(); } } public void decrement () throws InterruptedException { lock.lock(); try { while (number !=1 ) { condition.await(); } number--; System.out.println(Thread.currentThread().getName()+"\t" +number); condition.signalAll(); } finally { lock.unlock(); } } public static void main (String[] args) { LockSingalAwait syncNotifyWait = new LockSingalAwait(); new Thread(new Runnable() { @Override public void run () { for (int i = 0 ; i < 10 ; i++) { try { syncNotifyWait.increment(); } catch (InterruptedException e) { e.printStackTrace(); } } } }, "线程A" ).start(); new Thread(new Runnable() { @Override public void run () { for (int i = 0 ; i < 10 ; i++) { try { syncNotifyWait.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } } } }, "线程B" ).start(); new Thread(new Runnable() { @Override public void run () { for (int i = 0 ; i < 10 ; i++) { try { syncNotifyWait.increment(); } catch (InterruptedException e) { e.printStackTrace(); } } } }, "线程C" ).start(); new Thread(new Runnable() { @Override public void run () { for (int i = 0 ; i < 10 ; i++) { try { syncNotifyWait.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } } } }, "线程D" ).start(); } }
线程间定制化通信 代码地址
需求:三个线程 AA,BB,CC,现要求,AA BB CC 按顺序执行,且 AA 打印5次,BB 打印 10次,CC 打印 15 次,然后循环10次。
在 lock 中我们可以唤醒指定的线程,只需要使用 signal 即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 package com.atguigu.juc;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;public class LockSingalAwait2 { private int number = 1 ; private Lock lock = new ReentrantLock(); private Condition condition1 = lock.newCondition(); private Condition condition2 = lock.newCondition(); private Condition condition3 = lock.newCondition(); public void print5 (int loopCount) throws InterruptedException { lock.lock(); try { while (number!=1 ){ condition1.await(); } for (int i = 1 ; i <= 5 ; i++) { System.out.println("线程 :" + Thread.currentThread().getName() + " 打印:" + i + "次,当前第:" + loopCount + "轮" ); } number = 2 ; condition2.signal(); } finally { lock.unlock(); } } public void print10 (int loopCount) throws InterruptedException { lock.lock(); try { while (number!=2 ){ condition2.await(); } for (int i = 1 ; i <= 10 ; i++) { System.out.println("线程 :" + Thread.currentThread().getName() + " 打印:" + i + "次,当前第:" + loopCount + "轮" ); } number = 3 ; condition3.signal(); } finally { lock.unlock(); } } public void print15 (int loopCount) throws InterruptedException { lock.lock(); try { while (number!=3 ){ condition3.await(); } for (int i = 1 ; i <= 15 ; i++) { System.out.println("线程 :" + Thread.currentThread().getName() + " 打印:" + i + "次,当前第:" + loopCount + "轮" ); } number = 1 ; condition1.signal(); } finally { lock.unlock(); } } public static void main (String[] args) { LockSingalAwait2 lockSingalAwait2 = new LockSingalAwait2(); new Thread(new Runnable() { @Override public void run () { for (int i = 1 ; i <= 10 ; i++) { try { lockSingalAwait2.print5(i); } catch (InterruptedException e) { e.printStackTrace(); } } } }, "AA" ).start(); new Thread(new Runnable() { @Override public void run () { for (int i = 1 ; i <= 10 ; i++) { try { lockSingalAwait2.print10(i); } catch (InterruptedException e) { e.printStackTrace(); } } } }, "BB" ).start(); new Thread(new Runnable() { @Override public void run () { for (int i = 1 ; i <= 10 ; i++) { try { lockSingalAwait2.print15(i); } catch (InterruptedException e) { e.printStackTrace(); } } } }, "CC" ).start(); } }
上面代码中 AA 执行完成之后唤醒 BB 执行,BB 执行之后唤醒 CC,最后 CC 又唤醒 AA,直到最后循环 10 次。
线程不安全 Vector、HashTable、Properties 是线程安全的;
ArrayList、LinkedList、HashSet、TreeSet、HashMap、TreeMap 等都是线程不安全的。
List 线程安全 ArrayList 是线程不安全的,我们可以使用 CopyOnWriteArrayList。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 package com.atguigu.juc;import java.util.ArrayList;import java.util.List;import java.util.UUID;public class ArrayListDemo { public static void main (String[] args) { List<String> list = new ArrayList<>(); for (int i = 0 ; i < 10 ; i++) { new Thread(new Runnable() { @Override public void run () { list.add(UUID.randomUUID().toString()); System.out.println(list); } }, String.valueOf(i)).start(); } } }
1 2 3 4 5 6 7 8 java.util.ConcurrentModificationException at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909) at java.util.ArrayList$Itr.next(ArrayList.java:859) at java.util.AbstractCollection.toString(AbstractCollection.java:461) at java.lang.String.valueOf(String.java:2994) at java.io.PrintStream.println(PrintStream.java:821) at com.atguigu.juc.ArrayListDemo$1.run(ArrayListDemo.java:16) at java.lang.Thread.run(Thread.java:748)
解决方法有三种,最好的是第三种,前两种都不推荐使用。代码地址
Vector 解决方法1:Vector。Vector 也是 List 的实现类,但是查看 Vector 的源码发现,Vector 的 add 方法上有 synchronized,所以 Vector 是线程安全的。但是实际使用过程中,并不推荐使用 Vector 因为很老了,而且速度很慢。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 private static void noSafeList () { List<String> list = new Vector<>(); for (int i = 0 ; i < 10 ; i++) { new Thread(new Runnable() { @Override public void run () { list.add(UUID.randomUUID().toString()); System.out.println(list); } }, String.valueOf(i)).start(); } }
Collections 解决方法2: Collections。 Collections 工具类提供了一些 synchronized方法,例如 synchronizedList,synchronizedMap, synchronizedSet 之类的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 private static void noSafeList () { List<String> list = Collections.synchronizedList(new ArrayList<>()); for (int i = 0 ; i < 10 ; i++) { new Thread(new Runnable() { @Override public void run () { list.add(UUID.randomUUID().toString()); System.out.println(list); } }, String.valueOf(i)).start(); } }
CopyOnWriteArrayList 解决方法3: CopyOnWriteArrayList。CopyOnWriteArrayList 是 java.util.concurrent 包下的。底层原理是写时复制。对应的还有解决 set 线程不方法的方法 CopyOnWriteArraySet。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 private static void noSafeList () { CopyOnWriteArrayList list = new CopyOnWriteArrayList(); for (int i = 0 ; i < 10 ; i++) { new Thread(new Runnable() { @Override public void run () { list.add(UUID.randomUUID().toString()); System.out.println(list); } }, String.valueOf(i)).start(); } }
CopyOnWriteArrayList 是 arraylist 的一种线程安全变体,其中所有可变操作(add、set等)都是通过生成底层数组的新副本来实现的。
CopyOnWrite 容器即写时复制的容器。往一个容器添加元素的时候,不直接往当前容器 Object[] 添加,而是先将当前容器 Object[] 进行Copy,复制出一个新的容器 Object[] newElements,然后向新的容器Object[] newElements 里添加元素。添加元素后,再将原容器的引用指向新的容器。这样做的好处是可以对 CopyOnWrite 容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素。所以CopyOnWrite 容器也是一种读写分离的思想,读和写不同的容器。如下图:
其他资料,CopyOnWrite 优缺点
Set 线程安全 HashSet 是线程不安全的,我们可以使用 CopyOnWriteArraySet。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 private static void noSafeSet () { CopyOnWriteArraySet set = new CopyOnWriteArraySet(); for (int i = 0 ; i < 30 ; i++) { new Thread(new Runnable() { @Override public void run () { set.add(UUID.randomUUID().toString()); System.out.println(set); } }, String.valueOf(i)).start(); } }
HashSet 的底层是 HashMap 实现的,在 HashSet 中 add 新增的值,作为了 HashMap 的 key,value 是一个 PRESENT 常量。
1 2 3 4 5 6 7 public boolean add (E e) { return map.put(e, PRESENT)==null ; } private static final Object PRESENT = new Object();
Map 线程安全 HashMap 也是线程不安全的,我们可以使用 ConcurrentHashMap。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 private static void noSafeMap () { ConcurrentHashMap<String, Object> map = new ConcurrentHashMap<>(); for (int i = 0 ; i < 30 ; i++) { new Thread(new Runnable() { @Override public void run () { map.put(UUID.randomUUID().toString(), Thread.currentThread().getName()); System.out.println(map); } }, String.valueOf(i)).start(); } }
Callable 代码地址
获得多线程的方式主要有四种,之前的继承 Thread 类和实现 Runable 接口。还有现在的实现 Callable 接口和 java 线程池。
Callable 接口和 Runable 的三点不同:
Callable 实现的方法有返回值
Callable 实现的方法有异常
Callable 实现的方法是 call,Runable 实现的方法是 run
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 package com.atguigu.juc;import java.util.concurrent.Callable;class UseRunable implements Runnable { @Override public void run () { } } class UseCallable implements Callable { @Override public Integer call () throws Exception { return 200 ; } } public class CallableDemo { public static void main (String[] args) { } }
new Thread()
的时候,可以传入一个 Runable 对象,Callable 是否也可以直接传入呢?答案是不可以的,但是可以借助一个中间人 FutureTask,FutureTask 实现了Runable接口,同时也接受 Callable 类型的实现。这样就能连接起来了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 package com.atguigu.juc;import java.util.concurrent.Callable;import java.util.concurrent.ExecutionException;import java.util.concurrent.FutureTask;import java.util.concurrent.TimeUnit;class UseRunable implements Runnable { @Override public void run () {} } class UseCallable implements Callable { @Override public Integer call () throws Exception { TimeUnit.SECONDS.sleep(1 ); System.out.println(Thread.currentThread().getName()); return 200 ; } } public class CallableDemo { public static void main (String[] args) throws ExecutionException, InterruptedException { FutureTask futureTask = new FutureTask<Integer>(new UseCallable()); new Thread(futureTask, "CallableD 方式实现的线程 - A" ).start(); new Thread(futureTask, "CallableD 方式实现的线程 - B" ).start(); while (!futureTask.isDone()){ System.out.println("运算未完成!" ); } System.out.println(futureTask.get()); } }
在主线程中需要执行比较耗时的操作时,但又不想阻塞主线程时,可以把这些作业交给 FutureTask 对象在后台完成,当主线程将来需要时,就可以通过 FutureTask 对象获得后台作业的计算结果或者执行状态。仅在 FutureTask 计算完成时才能检索结果;如果计算尚未完成,就调用 get 方法获取结果,那么主进程就会被阻塞。而且一旦 FutureTask 计算完成,就不能再重新开始或取消计算。所以 FutureTask 有两点需要注意:
一个 FutureTask 对象只计算一次。
get 方法需要放到最后,即主线程可以在完成自己的任务后,再去获取结果。(否则未计算完获取结果会被阻塞)
JUC 强大辅助类 CountDownLatch 代码地址
CountDownLatch 主要有两个方法,当一个或多个线程调用 await 方法时,这些线程会阻塞。
其它线程调用 countDown 方法会将计数器减1(调用 countDown 方法的线程不会阻塞),当计数器的值变为 0 时,因 await 方法阻塞的线程会被唤醒,继续执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 package com.atguigu.juc;import java.util.concurrent.CountDownLatch;public class CountDownLatchDemo { public static void main (String[] args) throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(10 ); for (int i = 0 ; i < 10 ; i++) { new Thread(new Runnable() { @Override public void run () { System.out.println(Thread.currentThread().getName() + "正在执行!!" ); countDownLatch.countDown(); } }, "线程" + String.valueOf(i)).start(); } countDownLatch.await(); System.out.println("循环创建的线程全部执行完毕,我最后执行" ); } }
CyclicBarrier CyclicBarrier 的字面意思是可循环(Cyclic)使用的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。线程进入屏障通过 CyclicBarrier 的 await() 方法。
举例:只有集齐七颗龙珠,才能召唤出神龙。源码地址
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 package com.atguigu.juc;import java.util.concurrent.CyclicBarrier;public class CyclicBarrierDemo { public static void main (String[] args) { CyclicBarrier cyclicBarrier = new CyclicBarrier(7 , new Runnable() { @Override public void run () { System.out.println("集齐七颗龙珠, 可以召唤神龙!!!" ); } }); for (int i = 1 ; i <= 7 ; i++) { new Thread(new Runnable() { @Override public void run () { try { System.out.println(Thread.currentThread().getName() + "被收集" ); cyclicBarrier.await(); } catch (Exception e) { e.printStackTrace(); } } }, "龙珠" + String.valueOf(i)).start(); } } }
1 2 3 4 5 6 7 8 龙珠2被收集 龙珠4被收集 龙珠3被收集 龙珠1被收集 龙珠6被收集 龙珠5被收集 龙珠7被收集 集齐七颗龙珠, 可以召唤神龙!!!
Semaphore Semaphore 信号量主要用于两个目的,一个是用于多个共享资源的互斥使用,另一个用于并发线程数的控制。
在信号量上我们定义两种操作:
acquire(获取) 当一个线程调用 acquire 操作时,它要么通过成功获取信号量(信号量减1),要么一直等下去,直到有线程释放信号量,或超时。
release(释放)实际上会将信号量的值加1,然后唤醒等待的线程。
举例:停车场只有三个停车位,但是有六辆车需要停。停车场每辆车停留的时间有限,当停车场有空时,发出信号,外面的车辆可以进入停车场。源码地址
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 package com.atguigu.juc;import java.util.concurrent.Semaphore;import java.util.concurrent.TimeUnit;public class SemaphoreDemo { public static void main (String[] args) { Semaphore semaphore = new Semaphore(3 ); for (int i = 1 ; i <=6 ; i++) { new Thread(new Runnable() { @Override public void run () { try { semaphore.acquire(); System.out.println(Thread.currentThread().getName() + " 进入停车场" ); TimeUnit.SECONDS.sleep(2 ); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println(Thread.currentThread().getName() + " 开出停车场" ); semaphore.release(); } } }, "车辆" + String.valueOf(i)).start(); } } }
读写锁 Java 并发库中 ReetrantReadWriteLock 实现了 ReadWriteLock 接口并添加了可重入的特性。ReetrantReadWriteLock 读写锁的效率明显高于 synchronized 关键字。ReetrantReadWriteLock 读写锁的实现中,读锁使用共享模式;写锁使用独占模式,换句话说,读锁可以在没有写锁的时候被多个线程同时持有,写锁是独占的。ReetrantReadWriteLock 读写锁的实现中,需要注意的,当有读锁时,写锁就不能获得;而当有写锁时,除了获得写锁的这个线程可以获得读锁外,其他线程不能获得读锁。
以下代码实例就是没有加读写锁的情况,写操作 set 是一起进行的,并没有独占。代码地址
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 package com.atguigu.juc;import java.util.HashMap;import java.util.Map;import java.util.UUID;import java.util.concurrent.TimeUnit;class MyCache { private volatile Map<String, Object> map = new HashMap<>(); public void set (String key, Object value) { try { System.out.println("线程:" + Thread.currentThread().getName() + "正在写" + key); TimeUnit.MILLISECONDS.sleep(100 ); map.put(key, value); System.out.println("线程:" + Thread.currentThread().getName() + "写入" + key+"完成" ); }catch (Exception e) { e.printStackTrace(); }finally { } } public Object get (String key) { Object result = null ; try { System.out.println("线程:" + Thread.currentThread().getName() + "正在获取" + key + "的值" ); result = map.get(key); System.out.println("线程:" + Thread.currentThread().getName() + "正在获取" + key + "的值结束" ); }catch (Exception e) { e.printStackTrace(); }finally { } return result; } } public class ReadWriteLockDemo { public static void main (String[] args) throws InterruptedException { MyCache myCache = new MyCache(); for (int i = 0 ; i < 5 ; i++) { final int num = i; new Thread(new Runnable() { @Override public void run () { myCache.set(String.valueOf(num), UUID.randomUUID().toString().replace("-" , "" )); } }, String.valueOf(num)).start(); } TimeUnit.MILLISECONDS.sleep(100 ); for (int i = 5 ; i < 10 ; i++) { final int num = i; new Thread(new Runnable() { @Override public void run () { myCache.get(String.valueOf(num)); } }, String.valueOf(num)).start(); } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 线程:1正在写1 线程:4正在写4 线程:3正在写3 线程:2正在写2 线程:0正在写0 线程:5正在获取5的值 线程:5正在获取5的值结束 线程:6正在获取6的值 线程:6正在获取6的值结束 线程:7正在获取7的值 线程:7正在获取7的值结束 线程:8正在获取8的值 线程:8正在获取8的值结束 线程:9正在获取9的值 线程:9正在获取9的值结束 线程:4写入4完成 线程:2写入2完成 线程:1写入1完成 线程:3写入3完成 线程:0写入0完成
以下代码是加上读写锁之后的,可以看见写操作是独占的,读操作的并发的。代码地址
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 package com.atguigu.juc;import java.util.HashMap;import java.util.Map;import java.util.UUID;import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.ReadWriteLock;import java.util.concurrent.locks.ReentrantReadWriteLock;class MyCache2 { private volatile Map<String, Object> map = new HashMap<>(); ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); public void set (String key, Object value) { try { readWriteLock.writeLock().lock(); System.out.println("线程:" + Thread.currentThread().getName() + "正在写" + key); TimeUnit.MILLISECONDS.sleep(100 ); map.put(key, value); System.out.println("线程:" + Thread.currentThread().getName() + "写入" + key+"完成" ); }catch (Exception e) { e.printStackTrace(); }finally { readWriteLock.writeLock().unlock(); } } public Object get (String key) { Object result = null ; try { readWriteLock.readLock().lock(); System.out.println("线程:" + Thread.currentThread().getName() + "正在获取" + key + "的值" ); result = map.get(key); System.out.println("线程:" + Thread.currentThread().getName() + "正在获取" + key + "的值结束" ); }catch (Exception e) { e.printStackTrace(); }finally { readWriteLock.readLock().unlock(); } return result; } } public class ReadWriteLockDemo2 { public static void main (String[] args) throws InterruptedException { MyCache2 myCache2 = new MyCache2(); for (int i = 0 ; i < 5 ; i++) { final int num = i; new Thread(new Runnable() { @Override public void run () { myCache2.set(String.valueOf(num), UUID.randomUUID().toString().replace("-" , "" )); } }, String.valueOf(num)).start(); } TimeUnit.MILLISECONDS.sleep(100 ); for (int i = 5 ; i < 10 ; i++) { final int num = i; new Thread(new Runnable() { @Override public void run () { myCache2.get(String.valueOf(num)); } }, String.valueOf(num)).start(); } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 线程:0正在写0 线程:0写入0完成 线程:1正在写1 线程:1写入1完成 线程:4正在写4 线程:4写入4完成 线程:2正在写2 线程:2写入2完成 线程:3正在写3 线程:3写入3完成 线程:5正在获取5的值 线程:5正在获取5的值结束 线程:6正在获取6的值 线程:7正在获取7的值 线程:9正在获取9的值 线程:9正在获取9的值结束 线程:8正在获取8的值 线程:8正在获取8的值结束 线程:6正在获取6的值结束 线程:7正在获取7的值结束
现象变成了写操作是独占的了,也就是一个写操作完成之后在进行下一个写操作。读操作还是并发的
阻塞队列 简介 线程1往阻塞队列里添加元素,线程2从阻塞队列里移除元素,当队列是空的,从队列中获取元素的操作将会被阻塞。当队列是满的,从队列中添加元素的操作将会被阻塞,试图从空的队列中获取元素的线程将会被阻塞,直到其他线程往空的队列插入新的元素。试图向已满的队列中添加新元素的线程将会被阻塞,直到其他线程从队列中移除一个或多个元素或者完全清空,使队列变得空闲起来并后续新增。
为什么需要 BlockingQueue?好处是我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切 BlockingQueue 都给你一手包办了。在 concurrent 包发布以前,在多线程环境下,我们每个程序员都必须去自己控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度。
实现类
ArrayBlockingQueue :由数组结构组成的有界阻塞队列。(需要指定大小)
LinkedBlockingQueue :由链表结构组成的有界(但大小默认值为integer.MAX_VALUE)阻塞队列。
PriorityBlockingQueue:支持优先级排序的无界阻塞队列。
DelayQueue:使用优先级队列实现的延迟无界阻塞队列。
SynchronousQueue :不存储元素的阻塞队列,也即单个元素的队列。
LinkedTransferQueue:由链表组成的无界阻塞队列。
LinkedBlockingDeque:由链表组成的双向阻塞队列。
125标记为粗体的比较常用。
常用方法
代码实例 代码地址
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 package com.atguigu.juc;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.BlockingQueue;import java.util.concurrent.TimeUnit;public class BlockingQueueDemo { public static void main (String[] args) throws InterruptedException { BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<String>(3 ); System.out.println(blockingQueue.offer("a" , 4 , TimeUnit.SECONDS)); System.out.println(blockingQueue.offer("b" , 4 , TimeUnit.SECONDS)); System.out.println(blockingQueue.offer("c" , 4 , TimeUnit.SECONDS)); System.out.println(blockingQueue.poll(4 , TimeUnit.SECONDS)); System.out.println(blockingQueue.poll(4 , TimeUnit.SECONDS)); System.out.println(blockingQueue.poll(4 , TimeUnit.SECONDS)); } }
线程池(非常重要) 获得多线程的方式主要有四种,之前的继承 Thread 类和实现 Runable 接口。还有现在的实现 Callable 接口和 java 线程池。
简介 线程池的优势: 线程池做的工作只要是控制运行的线程数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果线程数量超过了最大数量,超出数量的线程排队等候,等其他线程执行完毕,再从队列中取出任务来执行。它的主要特点为:线程复用;控制最大并发数;管理线程。
Java 中的线程池是通过 Executor 框架实现的,该框架中用到了 Executor,Executors,ExecutorService,ThreadPoolExecutor 这几个类
具体实现 左边的 Executors 是工具类。这里类比 List 对象的创建,既可以用 new ArrayList ,也可以使用 Arrays.asList。同理创建线程池可以使用 Executors 工具类,也可以使用 new 的方式。
Executors.newFixedThreadPool(int) : 创建一个线程池,一池有N个固定的线程,有固定线程数的线程。执行长期任务性能好。
1 2 3 4 5 6 7 8 9 public static ExecutorService newFixedThreadPool (int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
Executors.newSingleThreadExecutor():一个任务一个任务的执行,一池一线程。
1 2 3 4 5 6 7 8 9 public static ExecutorService newSingleThreadExecutor () { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1 , 1 , 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
Executors.newCachedThreadPool():执行很多短期异步任务,线程池根据需要创建新线程,但在先前构建的线程可用时将重用它们。可扩容,遇强则强。即自动扩充。
1 2 3 4 5 6 7 8 public static ExecutorService newCachedThreadPool () { return new ThreadPoolExecutor(0 , Integer.MAX_VALUE, 60L , TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
使用示例:代码地址
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 package com.atguigu.juc;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class ThreadPoolDemo { public static void main (String[] args) { ExecutorService threadPool = Executors.newCachedThreadPool(); try { for (int i = 0 ; i < 30 ; i++) { threadPool.execute(new Runnable() { @Override public void run () { System.out.println(Thread.currentThread().getName() + "执行了任务!" ); } }); } }catch (Exception e) { e.printStackTrace(); }finally { threadPool.shutdown(); } } }
ThreadPoolExecutor FixedThreadPool, SingleThreadExecutor,CachedThreadPool 底层都是调用 ThreadPoolExecutor 的,只是参数不同而已。ThreadPoolExecutor 的参数有七个,列举如下:
corePoolsize: 线程池中常驻的核心线程数。核心线程是懒加载的,也就是说例如 corePoolsize 是 5,new 的时候并没有真正的创建出 5 个线程出来,而是在调用 execute 方法的时候才创建。
maximumPoolSize:线程池中能够容纳同时执行的最大线程数,此值必须大于等于1
keepAliveTime:多余的空闲线程的存活时间,当前池中线程数量超过 corePoolSize 时,当空闲时间达到keepAliveTime 时,多余线程会被销毁直到只剩下 corePoolSize 个线程为止。
unit:keepAliveTime的单位
workQueue:任务队列,被提交但尚未被执行的任务
threadFactory:表示生成线程池中工作线程的线程工厂,用于创建线程,一般默认的即可
handler:拒绝策略,表示当队列满了,并且工作线程大于等于线程池的最大线程数(maximumPoolSize)时如何来拒绝请求执行的 runnable 的策略。
线程池底层工作原理
在创建了线程池后,线程池中的线程数为零。
当调用 execute() 方法添加一个请求任务时,线程池会做出如下判断:
如果正在运行的线程数量小于 corePoolSize,那么马上创建线程运行这个任务;
如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入队列;
如果这个时候队列满了且正在运行的线程数量还小于 maximumPoolSize,那么还是要创建非核心线程立刻运行这个任务,注意⚠️这里不是将它放入队列,因为队列已经满了,而是马上创建线程来执行这个任务,创建的线程是直接执行这个任务,不去队列里面取任务。当执行完这个任务,才回去队列里面取任务。
如果队列满了且正在运行的线程数量大于或等于 maximumPoolSize,那么线程池会启动饱和拒绝策略来执行。
当一个线程完成任务时,它会从队列中取下一个任务来执行。
当一个线程无事可做超过一定的时间(keepAliveTime)时,线程会判断:如果当前运行的线程数大于corePoolSize,那么这个线程就被停掉。所以线程池的所有任务完成后,它最终会收缩到 corePoolSize 的大小。
哪种方式创建线程池最好 答案是一个都不用,只使用自定义的!!!
线程池拒绝策略
AbortPolicy(默认):直接抛出 RejectedExecutionException 异常阻止系统正常运行。即当核心线程数满后,且阻塞队列也满后,如果是该策略,那么会抛出异常。
CallerRunsPolicy:“调用者运行”一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量。 即当核心线程数满后,且阻塞队列也满后,如果是该策略,任务从哪儿来,就回到哪儿去。
DiscardOldestPolicy:抛弃队列中等待最久的任务,然后把当前任务加人队列中尝试再次提交当前任务。即当核心线程数满后,且阻塞队列也满后,如果是该策略,那么新的线程任务会替换掉队列中的最老的任务。
DiscardPolicy:该策略默默地丢弃无法处理的任务,不予任何处理也不抛出异常。如果允许任务丢失,这是最好的一种策略。即当核心线程数满后,且阻塞队列也满后,如果是该策略,任务直接就丢失了。
自定义线程池 代码地址
AbortPolicy 上面了解到,实际生产环境中,我们并不会去用 Excutors 来创建线程池,一般都是自己使用 ThreadPoolExecutor 来实现。下面就是示例,并且用到了各个拒绝策略。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 package com.atguigu.juc;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.Executors;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;public class MyThreadPool { public static void main (String[] args) { ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( 2 , 5 , 3L , TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3 ), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy() ); try { for (int i = 1 ; i <= 2 ; i++) { threadPoolExecutor.execute(new Runnable() { @Override public void run () { System.out.println("线程" + Thread.currentThread().getName() +"执行" ); } }); } }catch (Exception e) { e.printStackTrace(); }finally { threadPoolExecutor.shutdown(); } } }
上面使用的是 AbortPolicy 拒绝策略。当 for 循环中的 i<= 5 的时候,都是核心线程在执行。原因是 5 大于核心线程数,大于后,会将任务放入阻塞队列中,阻塞队列大小是3,正好将 3 个放进去。(类比去银行办理业务,只有两个柜台在办理,三个等候的位置,那么来五个人是核心线程可以处理的)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 try { for (int i = 1 ; i <= 5 ; i++) { threadPoolExecutor.execute(new Runnable() { @Override public void run () { System.out.println("线程:" + Thread.currentThread().getName() + "执行" ); } }); } }catch (Exception e) { e.printStackTrace(); }finally { threadPoolExecutor.shutdown(); }
1 2 3 4 5 线程:pool-1-thread-2执行 // 两个核心线程就能完成工作 线程:pool-1-thread-1执行 线程:pool-1-thread-2执行 线程:pool-1-thread-1执行 线程:pool-1-thread-2执行
此时,如果将 for 循环中的 i 的值改为 6 的时候,核心线程就处理不过来了,因为核心线程数为 2,队列里面最多可以阻塞3个,还剩1个任务没有地方处理,但是现在线程数量没有达到最大的线程数,此时可以启动新的线程来处理任务,只要保证线程的最大数量不能超过 5 个,即 核心线程 + 新的非核心线程 <= 5 即可。所以 i 的值为 6-8 之间都需要创建新的线程来处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 try { for (int i = 1 ; i <= 8 ; i++) { threadPoolExecutor.execute(new Runnable() { @Override public void run () { System.out.println("线程:" + Thread.currentThread().getName() + "执行" ); } }); } }catch (Exception e) { e.printStackTrace(); }finally { threadPoolExecutor.shutdown(); }
1 2 3 4 5 6 7 8 线程:pool-1-thread-1执行 线程:pool-1-thread-3执行 // 两个线程已经不能完成任务了,又创建了 3,4,5 三个非核心线程来帮忙 线程:pool-1-thread-1执行 线程:pool-1-thread-2执行 线程:pool-1-thread-1执行 线程:pool-1-thread-3执行 线程:pool-1-thread-4执行 线程:pool-1-thread-5执行
但是一旦超过 8,假设 i 的值为 9,此时核心线程为2,队列阻塞3个,创建新线程运行3个,那就还剩下一个任务没有被处理,那么此时就会走拒绝策略了。而此时的拒绝策略是 AbortPolicy,所以直接抛出异常。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 try { for (int i = 1 ; i <= 9 ; i++) { threadPoolExecutor.execute(new Runnable() { @Override public void run () { System.out.println("线程:" + Thread.currentThread().getName() + "执行" ); } }); } }catch (Exception e) { e.printStackTrace(); }finally { threadPoolExecutor.shutdown(); }
1 2 3 4 5 6 7 8 9 10 11 12 13 线程:pool-1-thread-1执行 线程:pool-1-thread-4执行 线程:pool-1-thread-3执行 线程:pool-1-thread-2执行 线程:pool-1-thread-3执行 线程:pool-1-thread-5执行 线程:pool-1-thread-4执行 线程:pool-1-thread-1执行 java.util.concurrent.RejectedExecutionException: Task com.atguigu.juc.MyThreadPool$1@66d3c617 rejected from java.util.concurrent.ThreadPoolExecutor@63947c6b[Running, pool size = 5, active threads = 0, queued tasks = 0, completed tasks = 8] at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379) at com.atguigu.juc.MyThreadPool.main(MyThreadPool.java:22)
CallerRunsPolicy 当任务数量 i 可以被处理时,处理的方法和上面 AbortPolicy 分析一样的,这里只说明当任务数量不能被处理,运行 CallerRunsPolicy 的情况。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 package com.atguigu.juc;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.Executors;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;public class MyThreadPool { public static void main (String[] args) { ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( 2 , 5 , 3L , TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3 ), Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy() ); try { for (int i = 1 ; i <= 33 ; i++) { threadPoolExecutor.execute(new Runnable() { @Override public void run () { System.out.println("线程:" + Thread.currentThread().getName() + "执行" ); } }); } }catch (Exception e) { e.printStackTrace(); }finally { threadPoolExecutor.shutdown(); } } }
1 2 3 4 5 6 7 8 9 10 线程:pool-1-thread-1执行 线程:pool-1-thread-4执行 线程:pool-1-thread-3执行 线程:pool-1-thread-5执行 线程:pool-1-thread-2执行 线程:pool-1-thread-3执行 线程:pool-1-thread-4执行 线程:pool-1-thread-1执行 线程:main执行 // 当处理不了的时候,从哪儿来就回哪儿去 ......
DiscardOldestPolicy 当任务数量 i 可以被处理时,处理的方法和上面 AbortPolicy 分析一样的,这里只说明当任务数量不能被处理,运行 DiscardOldestPolicy 的情况。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 package com.atguigu.juc;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.Executors;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;public class MyThreadPool { public static void main (String[] args) { ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( 2 , 5 , 3L , TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3 ), Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardOldestPolicy() ); try { for (int i = 1 ; i <= 33 ; i++) { threadPoolExecutor.execute(new Runnable() { @Override public void run () { System.out.println("线程:" + Thread.currentThread().getName() + "执行" ); } }); } }catch (Exception e) { e.printStackTrace(); }finally { threadPoolExecutor.shutdown(); } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 线程:pool-1-thread-1执行 线程:pool-1-thread-3执行 线程:pool-1-thread-1执行 线程:pool-1-thread-2执行 线程:pool-1-thread-3执行 线程:pool-1-thread-2执行 线程:pool-1-thread-5执行 线程:pool-1-thread-4执行 线程:pool-1-thread-1执行 线程:pool-1-thread-4执行 线程:pool-1-thread-4执行 线程:pool-1-thread-4执行 线程:pool-1-thread-5执行 线程:pool-1-thread-2执行 线程:pool-1-thread-3执行 线程:pool-1-thread-1执行 线程:pool-1-thread-4执行
有一部分老的任务没有执行,被新的任务给替代了
DiscardPolicy 当任务数量 i 可以被处理时,处理的方法和上面 AbortPolicy 分析一样的,这里只说明当任务数量不能被处理,运行 DiscardPolicy 的情况。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 package com.atguigu.juc;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.Executors;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;public class MyThreadPool { public static void main (String[] args) { ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( 2 , 5 , 3L , TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3 ), Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardPolicy() ); try { for (int i = 1 ; i <= 33 ; i++) { threadPoolExecutor.execute(new Runnable() { @Override public void run () { System.out.println("线程:" + Thread.currentThread().getName() + "执行" ); } }); } }catch (Exception e) { e.printStackTrace(); }finally { threadPoolExecutor.shutdown(); } } }
1 2 3 4 5 6 7 8 9 10 11 线程:pool-1-thread-1执行 线程:pool-1-thread-3执行 线程:pool-1-thread-2执行 线程:pool-1-thread-5执行 线程:pool-1-thread-2执行 线程:pool-1-thread-3执行 线程:pool-1-thread-4执行 线程:pool-1-thread-1执行 线程:pool-1-thread-3执行 线程:pool-1-thread-2执行 线程:pool-1-thread-5执行
尽自己最大的努力执行,能执行多少就执行多少。
分支合并框架 异步回调